﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using QQ2564874169.Core.Lock;
using QQ2564874169.Core.Utils;

namespace QQ2564874169.MessageQueue
{
    public interface IConsumeModel<T> where T : ToMq, new()
    {
        void Consume(string message);
    }

    public interface IConsumeModel<T, MT> where T : ToMq, new()
    {
        void Consume(MT model);
    }

    public class SubscribeModelErrorEventArgs : EventArgs
    {
        public object Service { get; internal set; }
        public object Model { get; internal set; }
        public string Queue { get; internal set; }
        public Exception Error { get; internal set; }
    }

    public class SubscribeModelFromMq
    {
        public event EventHandler<SubscribeModelErrorEventArgs> Error;

        public int ParallelMax
        {
            get { return _lockspace.Max; }
            set { _lockspace.Max = value; }
        }

        private IList<SettingContext> _settings = new List<SettingContext>();
        private IList<IMqSubscriber> _subscribers = new List<IMqSubscriber>();
        private Func<Type, string, object> _deserialize;
        private Func<IResolver> _getResolver;
        private ParallelLockspace _lockspace = new ParallelLockspace(10);

        public SubscribeModelFromMq(Func<IResolver> createResolver, Func<Type, string, object> deserialize)
        {
            _getResolver = createResolver;
            _deserialize = deserialize;
        }

        public void Stop()
        {
            foreach (var sub in _subscribers)
            {
                sub.Received -= Subscriber_OnReceived;
                sub.Dispose();
            }
            _subscribers.Clear();

            while (_lockspace.Running)
            {
                TaskHelper.Delay(1000).Wait();
            }
        }

        protected virtual void OnError(SubscribeModelErrorEventArgs e)
        {
            Error?.Invoke(this, e);
        }

        public void LoadService(params Type[] types)
        {
            var t1 = typeof(IConsumeModel<>);
            var t2 = typeof(IConsumeModel<,>);
            types = types.Where(t => t.IsClass && !t.IsGenericType && !t.IsAbstract).ToArray();

            foreach (var type in types)
            {
                var ints = type.GetInterfaces().ToArray();
                if (ints.Length < 1)
                    continue;
                var its = ints.Where(i => i.IsGenericType).ToArray();
                if (its.Length < 1)
                    continue;

                foreach (var it in its)
                {
                    var gt = it.GetGenericTypeDefinition();
                    Type tomq;
                    string mt = null;
                    if (t1 == gt)
                    {
                        tomq = it.GetGenericArguments().First();
                    }
                    else if (t2 == gt)
                    {
                        var args = it.GetGenericArguments();
                        if (args.Length != 2)
                            continue;
                        tomq = args.First();
                        mt = args[1].AssemblyQualifiedName;
                    }
                    else
                    {
                        continue;
                    }
                    var mq = Activator.CreateInstance(tomq) as ToMq;
                    if (mq != null && !_settings.Any(i => i.ModelType == mt && i.ServiceType == type && i.Mq == tomq))
                    {
                        _settings.Add(new SettingContext
                        {
                            ModelType = mt,
                            ServiceType = type,
                            Mq = tomq,
                            SubscribeQueue = mq.SubscribeQueue
                        });
                    }
                }
            }
        }

        public void Start()
        {
            var mqs = _settings.Select(i => Activator.CreateInstance(i.Mq)).Cast<ToMq>().ToArray();

            foreach (var tomq in mqs)
            {
                tomq.Subscriber.Received += Subscriber_OnReceived;
                tomq.Subscriber.Consume(tomq.SubscribeQueue, tomq);
                _subscribers.Add(tomq.Subscriber);
            }
        }

        private void Subscriber_OnReceived(MqMessage msg)
        {
            var msgModel = _deserialize(typeof(ModelMessage), msg.Content) as ModelMessage;
            if (msgModel == null)
                return;
            var contexts = _settings.Where(i => i.SubscribeQueue == msgModel.SQueue).Select(i => new ExecContext
            {
                Setting = i,
                Model = msgModel.Model,
                ModelType = i.ModelType
            }).ToArray();

            foreach (var context in contexts)
            {
                context.Locker = _lockspace.Lock();
                var task = new Task(state =>
                {
                    Exec((ExecContext) state);
                }, context);
                task.ContinueWith(t =>
                {
                    ((ExecContext) t.AsyncState).Locker.Dispose();
                });
                task.Start();
            }
        }

        private void Exec(ExecContext context)
        {
            using (var resolver = _getResolver())
            {
                var service = resolver.Resolve(context.Setting.ServiceType);
                MethodInfo method;
                object model;

                if (context.ModelType == null)
                {
                    model = context.Model;
                    method = typeof(IConsumeModel<>).MakeGenericType(context.Setting.Mq).
                            GetMethods().First();
                }
                else
                {
                    var mt = Type.GetType(context.ModelType, false);
                    if (mt == null)
                        return;
                    model = _deserialize(mt, context.Model);
                    method = typeof(IConsumeModel<,>).MakeGenericType(context.Setting.Mq, mt).
                            GetMethods().First();
                }
                try
                {
                    method.Invoke(service, new[] { model });
                }
                catch (Exception ex)
                {
                    OnError(new SubscribeModelErrorEventArgs
                    {
                        Model = model,
                        Error = ex,
                        Queue = context.Setting.SubscribeQueue,
                        Service = service
                    });
                }
            }
        }

        private class SettingContext
        {
            public string ModelType { get; set; }
            public Type ServiceType { get; set; }
            public Type Mq { get; set; }
            public string SubscribeQueue { get; set; }
        }

        private class ExecContext
        {
            public string Model { get; set; }
            public string ModelType { get; set; }
            public SettingContext Setting { get; set; }
            public IDisposable Locker { get; set; }
        }
    }
}
